Kinesis Data Streamsのイベントを処理する Lambda Functionを作成してみた。

Kinesis Data Streamsのイベントを処理する Lambda Functionを作成してみた。

Clock Icon2021.12.16

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Kinesis Data Streamsとは?

Kinesis Data Streams はサーバーレスデータストリーミングサービスです。Amazon Kinesis Data Streams を使用して、大規模なデータを収集し、処理することができます。Kinesis Data Streamsで、プロデューサーはKinesis データストリームにデータをプッシュし、コンシューマーはリアルタイムにデータを処理します。

この記事では、Kinesis Data Streamsのイベントを処理する Lambda Functionを作成してみました。データレコードがストリームに書き込まれると、Lambda Functionが呼び出され、データが処理されます。

 

 

やってみた

IAM ロールの作成

  • LambdaのIAMロールを作成しておきます。
  • この設定でIAMロールを作成しておきます。
    • 信頼されたエンティティの種類 : AWS のサービス
    • ユースケース : AWS Lambda
    • ポリシー  :  AWSLambdaKinesisExecutionRole
    • ロールの名前 :  Lambda-Kinesis-Role
  • このIAMロールは、Lambda Functionを作成するときに使用されます。

 

Lambda Functionの作成

  • AWS Lambdaコンソールで、Create functionを選択しておきます。
  • 関数名を入力して、RuntineでNode.jsを選択して、前の手順で作成したIAMロールを選択して、関数を作成しておきます。

 

 

  • 機能コードを次のコードに置き換えます。必要に応じてコードを変更できます。Kinesisデータはbase64でエンコードされているため、デコードしておきます。
exports.handler = function(event, context) {
    event.Records.forEach(function(record) {
        var data = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
        console.log('Decoded data:', data);
    });
};

 

Kinesis Data Streamの作成

  • Amazon Kinesisコンソールで、Kinesis Data Streamsを選択して、Create data streamをクリックしておきます。

 

 

  • 次の設定でData Streamを作成しておきます。
    • Data stream capacity : Provisioned
    • Provisioned shards : 1

 

 

Lambda でイベントソースを追加する

  • CLIで、関数名とデータストリームArnを使用して次のコマンドを実行しておきます。
aws lambda create-event-source-mapping --function-name Lambda-Kinesis \
--event-source  arn:aws:kinesis:us-east-1:000000000:stream/Kinesis-Lambda-stream \
--batch-size 100 --starting-position LATEST --region us-east-1

 

  • イベントソースマッピングが作成されました。

 

データレコードを Kinesis Data Streamに追加する

  • CLIで次のコマンドを使用して、データレコードをデータストリームに追加しておきます。
aws kinesis put-record --stream-name Kinesis-Lambda-stream --partition-key 1 \
--data "Hello" --region us-east-1

 

 

  • 「Invalid base64」エラーが発生した場合は、[--cli-binary-format raw-in-base64-out ]をコマンドと一緒に使用してください。
aws kinesis put-record --stream-name Kinesis-Lambda-stream --partition-key 1 \
--data "Hello" --cli-binary-format raw-in-base64-out --region us-east-1

 

  • データレコードが追加されると、Lambda関数が呼び出されます。この関数は、レコードからデータをデコードしてログに記録します。CloudWatchログでアウトプットを見ることができます。

 

 

まとめ

Kinesis Data Streamsのイベントを処理する Lambda Functionを作成してみました。Real-time data analyticsなどの多くのシナリオでKinesisデータストリームを使用できます。Kinesis Data Streams は、さまざまなデータストリーミングの問題解決に使用できます。

Reference : Lambda function to consume events from a Kinesis stream

 

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.